package com.ruoyi.iot.exchange.network.tcp.handler;



import com.ruoyi.iot.exchange.network.tcp.cache.TcpChannelCache;
import com.ruoyi.iot.exchange.network.tcp.notify.DeviceStateNotify;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.io.Serializable;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Slf4j
public  class TCPSplitFrameDecoder extends ByteToMessageDecoder implements Serializable {


    ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2,8,0L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100));



    TCPDispatchHandlerGatherProxy tcpDispatchHandlerGatherProxy;
    public TCPSplitFrameDecoder(TCPDispatchHandlerGatherProxy tcpDispatchHandlerGatherProxy ){
        this.tcpDispatchHandlerGatherProxy = tcpDispatchHandlerGatherProxy;
    }


    /**
     * 分发不同的通信协议
     * @param channelHandlerContext 通道
     * @param byteBuf 缓存
     * @param list 转移的数据对象
     */
    //分发不同的通信协议
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        log.info("ctx: decode!");
        tcpDispatchHandlerGatherProxy.injectByteBuf(channelHandlerContext,byteBuf);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        super.channelRead(ctx, msg);
        //log.info("ctx: channelRead!");
        poolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                tcpDispatchHandlerGatherProxy.decodeFrame();
            }
        });
    }



    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        log.info("ctx:" + ctx.toString() + " connect!");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if( TcpChannelCache.getDevId(ctx) != null) {
            log.info(String.format("ctx:%s close", TcpChannelCache.getDevId(ctx)));
        }
        log.info("ctx:" + ctx.toString() + " disconnect!");
        closeEvent(ctx);
        super.channelInactive(ctx);
    }

    /**
     * 关闭事件
     * @param ctx
     */
    private void closeEvent(ChannelHandlerContext ctx){
        String devId = TcpChannelCache.getDevId(ctx);
        TcpChannelCache.removeChannel(ctx);
        if(StringUtils.isNotEmpty(devId)) {
            DeviceStateNotify.getInstance().offline(devId);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        closeEvent(ctx);
    }





}
